一. 資料準備
這次任務是實作機器翻譯,資料: http://www.manythings.org/anki/ 可以找中翻英的data,可以找cmn-eng/cmn.txt這個簡體字的資料~這個實作是之前上過'NLP 100天馬拉松'的code做一些修改~
二. 整理資料部分
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import csv
import numpy as np
import re
import random
import math
import time
import spacy
from torchtext.data import Field, BucketIterator, TabularDataset
data_dir = 'your_path/'
lines = open(data_dir + 'cmn.txt' , encoding='utf-8').read().strip().split('\n')
trnslt_pairs = [[s for s in l.split('\t')] for l in lines ]
print ("Sample: " , trnslt_pairs[1000][0:2] )
print ("Total records:" , len(trnslt_pairs))
# Sample: ['He was drowned.', '他被淹死了。']
# Total records: 24360
# create train and validation set
trnslt_pairs = [pair for pair in trnslt_pairs if pair[1][0] in ['我','你','他','她']]
print (f"Total records: {len(trnslt_pairs)}")
train, test = train_test_split(trnslt_pairs, test_size=0.09)
train, val = train_test_split(train, test_size=0.08)
print (f"training data:{len(train)} , develop data: {len(val)} , testing data: {len(test)}")
def write_csv(trn_data, file_path):
with open(file_path ,'w', newline='', encoding='utf-8') as fout:
writer = csv.writer (fout)
for itm in trn_data:
writer.writerow ([itm[0],itm[1]])
file_path = data_dir + 'train.csv'
write_csv(train, file_path )
file_path = data_dir + 'val.csv'
write_csv(val, file_path )
file_path = data_dir + 'test.csv'
write_csv(test, file_path )
spacy_eng = spacy.load('en_core_web_sm')
def tokensize_for_en(text):
text = re.sub(r"([.!?])", r" \1", text)
return [tok.text for tok in spacy_eng.tokenizer(text)]
def tokensize_for_ch(text):
#去掉非中文字元
regex = re.compile(r'[^\u4e00-\u9fa5A-Za-z0-9]')
text = regex.sub(' ', text)
return [word for word in text if word.strip()]
target_en = Field(tokenize = tokensize_for_en,
init_token = '<sos>', eos_token = '<eos>',
lower = True, batch_first = True)
source_ch = Field(tokenize = tokensize_for_ch,
init_token = '<sos>', eos_token = '<eos>',
lower = True, batch_first = True)
train_dataset, dev_dataset, test_dataset = TabularDataset.splits(
path = data_dir , format = 'csv', skip_header = True,
train='train.csv', validation='val.csv', test='test.csv',
fields=[
('trg', target_en),
('src', source_ch)
]
)
source_ch.build_vocab(train_dataset, min_freq = 1) #min_freq可以自己調整
target_en.build_vocab(train_dataset, min_freq = 1)
print ("中文語料的字元表長度: " , len(source_ch.vocab) , ", 英文的字元表長度: " ,len(target_en.vocab))
print ("Sample SRC:", test_dataset[0].src , "TRG:", test_dataset[0].trg)
BATCH_SIZE = 128
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_dataset, dev_dataset, test_dataset),
batch_size = BATCH_SIZE,
sort_within_batch = True,
sort_key = lambda x : len(x.src),
device = device)
三. encoder
class TransformerEncoder(nn.Module):
def __init__(self, hidden_dim, feedforward_dim, n_enc_layers,
n_attn_heads, dropout, src_voca_length, max_pos_length ,
device):
"""
hidden_dim: embedding size
feedforward_dim: feedforward 維度
n_enc_layers: 幾層encoder layers
n_attn_heads: 分成幾個attention
dropout: dropout
src_voca_length: 輸入的字典大小(此處為中文字典)
max_pos_length: 設定的最大長度(做position embedding用)
"""
super().__init__()
self.device = device
# 字元 embedding
self.src_tok_embedding = nn.Embedding(src_voca_length , hidden_dim)
# position embedding
self.src_pos_embedding = nn.Embedding(max_pos_length, hidden_dim)
# 建立 n_enc_layers 層的 Transformer Encoder 層
self.transformer_encoder_layers = nn.ModuleList([TransformerEncoderLayer(
hidden_dim,
feedforward_dim,
n_enc_layers,
n_attn_heads,
dropout,
device) for _ in range(n_enc_layers)])
self.dropout = nn.Dropout(dropout)
def forward(self, src_sentence, src_mask):
"""
src_sentence: [batch_size, src_len]
src_mask: [batch_size, src_len]
"""
batch_size = src_sentence.shape[0]
src_len = src_sentence.shape[1]
# 產生 position embedding數列
# [batch_size, src_len]
pos = torch.arange(0, src_len).unsqueeze(0).repeat(batch_size, 1).to(self.device)
# 將 token embedding 和 position embedding 相加
# src_sentence [batch_size, src_len, hid_dim]
src_sentence = self.dropout(self.src_tok_embedding(src_sentence) + self.src_pos_embedding(pos))
# 將 src_sentence 輸入 n_enc_layers 層的 transformer encoder layers
for layer in self.transformer_encoder_layers:
encoder_hidden, encoder_self_attention = layer(src_sentence, src_mask)
# 輸出最後一層的 hidden layer and encoder self attention
# encoder_hidden [batch_size, src_len, hid_dim]
# encoder_self_attention [batch_size , attention_heads, src_len, src_len]
return encoder_hidden , encoder_self_attention
class TransformerEncoderLayer(nn.Module):
def __init__(self, hidden_dim , feedforward_dim, n_enc_layers, n_attn_heads, dropout , device):
"""
hidden_dim: embedding size
feedforward_dim: feedforward 維度
"""
super().__init__()
# 建立 Multi Head self Attention
self.self_attention_sublayer = MultiHeadAttentionSubLayer(hidden_dim, n_attn_heads, dropout, device)
# layer norm
self.self_attn_layernorm = nn.LayerNorm(hidden_dim)
# 建立 Position Wise Feedforward
self.feedforward_sublayer = PosFeedForwardSubLayer(hidden_dim,feedforward_dim,dropout)
# layer norm
self.feedforward_layernorm = nn.LayerNorm(hidden_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, src_embedding, src_mask):
"""
src_embedding: [batch_size, src_len, hid_dim]
src_mask: [batch_size, src_len]
"""
# 將 K Q V 計算 attention
_src, encoder_self_attention = self.self_attention_sublayer(src_embedding, src_embedding, src_embedding, src_mask)
# dropout, residual 殘差 connection and layer norm
# src_embedding [batch_size, src_len, hid_dim]
src_embedding = self.self_attn_layernorm(src_embedding + self.dropout(_src))
# positionwise feedforward
_src = self.feedforward_sublayer(src_embedding)
#dropout, residual 殘差 and layer norm
src_embedding = self.feedforward_layernorm(src_embedding + self.dropout(_src))
# 輸出 src_sentence hidden layer 和 encoder_self_attention
# src_embedding [batch_size, src_len, hid_dim]
# encoder_self_attention [batch_size, attension_heads, src_len, src_len]
return src_embedding , encoder_self_attention
class MultiHeadAttentionSubLayer(nn.Module):
def __init__(self, hidden_dim , n_attn_heads, dropout, device):
"""
hidden_dim: embedding size
n_attn_heads: 分成幾個attention
dropout: dropout
"""
super().__init__()
# 確定 設定的 hidden layer 維度可以被 attention head 整除
assert hidden_dim % n_attn_heads ==0
# hidden layer 維度
self.hidden_dim = hidden_dim
# multi-heads 的個數
self.n_attn_heads = n_attn_heads
# 平均分到每個 multi-head 的 維度
self.head_dim = hidden_dim // n_attn_heads
# 定義 Wq Wk Wv
self.full_conn_q = nn.Linear(hidden_dim, hidden_dim)
self.full_conn_k = nn.Linear(hidden_dim, hidden_dim)
self.full_conn_v = nn.Linear(hidden_dim, hidden_dim)
# 最後一層 線性轉換
self.full_conn_o = nn.Linear(hidden_dim, hidden_dim)
self.dropout = nn.Dropout(dropout)
# 根據維度大小調整 attention 值 以免維度太大 Q dot K 結果過大影響學習效率
self.scale = torch.sqrt(torch.FloatTensor([self.head_dim])).to(device)
def forward(self, query_input, key_input, value_input, mask = None):
"""
query_input: q [batch_size, q_len, hid_dim]
key_input: q [batch_size, k_len, hid_dim]
value_input: q [batch_size, v_len, hid_dim]
"""
batch_size = query_input.shape[0]
# 定義 WQ*q -> Q WK*k -> K WV*v -> V
# Q [batch size, query len, hid dim]
# K [batch size, key len, hid dim]
# V [batch size, value len, hid dim]
Q = self.full_conn_q(query_input)
K = self.full_conn_k(key_input)
V = self.full_conn_v(value_input)
# 將 attention 分成多個 attention
def split_attention(Q, K, V, num_atn_head, head_dim, batch_size):
Q = Q.view(batch_size, -1, num_atn_head, head_dim)
K = K.view(batch_size, -1, num_atn_head, head_dim)
V = V.view(batch_size, -1, num_atn_head, head_dim)
return Q, K, V
# 將 attention 的 2 和 3 維度轉置 以達到將 attention head 提到前面 而分開每個 attention head
def seperate_heads(Q, K, V):
Q = Q.permute(0, 2, 1, 3) # (batch_size, num_atn_head, query_len, head_dim)
K = K.permute(0, 2, 1, 3) # (batch_size, num_atn_head, key_len, head_dim)
V = V.permute(0, 2, 1, 3) # (batch_size, num_atn_head, value_len, head_dim)
return Q , K , V
Q, K, V = split_attention(Q, K, V, self.n_attn_heads, self.head_dim, batch_size)
Q, K, V = seperate_heads (Q, K, V)
# 將K的最後兩個維度轉置做 Q * K 除以 scale
# scaled_dot_product_similarity [batch_size, n_heads, query_len, key_len]
scaled_dot_product_similarity = torch.matmul(Q, K.permute(0, 1, 3, 2)) / self.scale
# 做 mask
if mask is not None:
scaled_dot_product_similarity = scaled_dot_product_similarity.masked_fill(mask == 0, -1e10)
# 隊最後一維做 softmax attention [batch_size, n_heads, query_len, key_len]
attention = torch.softmax(scaled_dot_product_similarity, dim = -1)
# 最後與 V 相乘
# x [batch_size, n_heads, query_len, head_dim]
x = torch.matmul(self.dropout(attention), V)
# 轉換維度 準備將attention合併 x [batch_size, query_len, n_heads, head_dim]
x = x.permute(0, 2, 1, 3).contiguous()
#x [batch_size, query_len, hid_dim]
x = x.view(batch_size, -1, self.hidden_dim)
# 執行最後一層 x [batch_size, query_len, hid_dim]
x = self.full_conn_o(x)
return x, attention
class PosFeedForwardSubLayer(nn.Module):
def __init__(self, hidden_dim, ff_dim, dropout):
super().__init__()
self.full_conn_1 = nn.Linear(hidden_dim, ff_dim)
self.full_conn_2 = nn.Linear(ff_dim, hidden_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# x [batch_size, seq_len, ff_dim]
x = self.dropout(torch.relu(self.full_conn_1(x)))
# x [batch_size, seq_len, hid_dim]
x = self.full_conn_2(x)
return x
今天先說到建立encoder這邊~因為code真的滿長的~我分2天來寫